package com.atguigu.app.func;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.bean.TableProcess;
import com.atguigu.common.Constant;
import com.atguigu.utils.JdbcUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.util.*;

public class DimTableProcessFunction extends BroadcastProcessFunction<JSONObject, TableProcess, JSONObject> {

    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
    private HashMap<String, TableProcess> tableProcessHashMap;

    public DimTableProcessFunction(MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.mapStateDescriptor = mapStateDescriptor;
    }

    @Override
    public void open(Configuration parameters) throws Exception {

        //初始化HashMap
        tableProcessHashMap = new HashMap<>();

        Connection connection = DriverManager.getConnection(Constant.MYSQL_URL, "root", "000000");
        List<TableProcess> tableProcessList = JdbcUtil.queryList(connection,
                "select * from table_process where sink_type='dim'",
                TableProcess.class,
                true);

        //遍历集合,将数据放入HashMap
        for (TableProcess tableProcess : tableProcessList) {
            tableProcessHashMap.put(tableProcess.getSourceTable(), tableProcess);
        }

        connection.close();
    }

    //value:{"database":"gmall-220623-flink","table":"comment_info","type":"insert","ts":1669162958,"xid":1111,"xoffset":13941,"data":{"id":1595211185799847960,"user_id":119,"nick_name":null,"head_img":null,"sku_id":31,"spu_id":10,"order_id":987,"appraise":"1204","comment_txt":"评论内容：48384811984748167197482849234338563286217912223261","create_time":"2022-08-02 08:22:38","operate_time":null}}
    @Override
    public void processElement(JSONObject value, BroadcastProcessFunction<JSONObject, TableProcess, JSONObject>.ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {

        System.out.println("processElement>>>" + value);

        //获取状态数据
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);

        //判断是否存在做过滤数据  行,列
        String table = value.getString("table");
        TableProcess tableProcess = broadcastState.get(table);
        TableProcess mapTableProcess = tableProcessHashMap.get(table);
        String type = value.getString("type");
        if ((tableProcess != null || mapTableProcess != null) && (!"bootstrap-start".equals(type) && !"bootstrap-complete".equals(type))) {

            if (tableProcess == null) {
                tableProcess = mapTableProcess;
            }

            //列过滤
            filterColumns(value.getJSONObject("data"), tableProcess.getSinkColumns());

            //补充信息
            value.put("sinkTable", tableProcess.getSinkTable());
            value.put("rowKeyColumn", tableProcess.getSinkRowKey());
            value.put("splitKey", tableProcess.getSinkExtend());
            value.put("columnFamily", tableProcess.getSinkFamily());

            //输出数据
            out.collect(value);

        } else {
            System.out.println("该表：" + table + "配置信息不存在！或者操作类型不对：" + type);
        }
    }

    /**
     * 过滤字段
     *
     * @param data        {"id":"1001","tm_name":"atguigu","logo_url":"/xx/xx"}
     * @param sinkColumns "id,tm_name"
     */
    private void filterColumns(JSONObject data, String sinkColumns) {

        //处理过滤字段
        String[] split = sinkColumns.split(",");
        List<String> columnList = Arrays.asList(split);

        Set<Map.Entry<String, Object>> entries = data.entrySet();
        entries.removeIf(next -> !columnList.contains(next.getKey()));

//        Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
//        while (iterator.hasNext()) {
//            Map.Entry<String, Object> next = iterator.next();
//            if (!sinkColumns.contains(next.getKey())) {
//                iterator.remove();
//            }
//        }
    }

    @Override
    public void processBroadcastElement(TableProcess tableProcess, BroadcastProcessFunction<JSONObject, TableProcess, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {

        if (tableProcess != null) {
            //获取状态
            BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);

            //根据操作类型决定插入数据进状态还是删除
            String op = tableProcess.getOp();
            String sourceTable = tableProcess.getSourceTable();
            if ("d".equals(op)) {
                broadcastState.remove(sourceTable);
                tableProcessHashMap.remove(sourceTable);
            } else {
                broadcastState.put(sourceTable, tableProcess);
            }
        }
    }
}
